---
title: "并发和异步编程 - async/await、Actors"
description: "学习 Swift 并发：async/await、actors、结构化并发、任务组，与 JavaScript 对比"
---

# 并发和异步编程：async/await、Actors

在本模块中，我们探索 Swift 的现代并发系统，包括 async/await、actors、结构化并发和任务组。我们将这些方法与 JavaScript 的基于 Promise 和回调的并发模式进行对比。

## 目录
- [介绍：并发模型](#介绍并发模型)
- [Async/Await 基础](#asyncawait-基础)
- [结构化并发](#结构化并发)
- [Actors](#actors)
- [任务组](#任务组)
- [异步序列](#异步序列)
- [高级并发](#高级并发)
- [练习题](#练习题)
- [关键要点](#关键要点)

## 介绍：并发模型

Swift 的现代并发系统相比传统线程和 JavaScript 的事件循环提供了更安全、更高效的方法。

| 特性               | JavaScript | Swift |
|--------------------|------------|-------|
| Async/Await        | ES2022     | Swift 5.5+ |
| 结构化并发         | 否         | 是    |
| Actors             | 否         | 是    |
| 任务组             | 手动       | 内置   |
| 取消               | 手动       | 自动   |
| 数据竞争           | 可能       | 预防   |
| 内存安全           | 运行时     | 编译时 |

## Async/Await 基础

<UniversalEditor title="Async/Await 对比" compare={true}>
```javascript !! js
// JavaScript：基于 Promise 的 async/await
function fetchUserData(id) {
    return new Promise((resolve, reject) => {
        setTimeout(() => {
            if (id > 0) {
                resolve({ id, name: "张三", email: "zhang@example.com" });
            } else {
                reject(new Error("用户未找到"));
            }
        }, 1000);
    });
}

function fetchUserPosts(userId) {
    return new Promise((resolve, reject) => {
        setTimeout(() => {
            if (userId > 0) {
                resolve([{ id: 1, title: "第一篇帖子" }]);
            } else {
                reject(new Error("帖子未找到"));
            }
        }, 500);
    });
}

async function loadUserProfile(userId) {
    try {
        console.log("正在加载用户数据...");
        const user = await fetchUserData(userId);
        console.log("正在加载用户帖子...");
        const posts = await fetchUserPosts(user.id);
        return { user, posts };
    } catch (error) {
        console.error("加载资料失败:", error.message);
        throw error;
    }
}

// 使用
loadUserProfile(1)
    .then(profile => console.log("资料已加载:", profile))
    .catch(error => console.error("错误:", error.message));
```

```swift !! swift
// Swift：原生 async/await
func fetchUserData(id: Int) async throws -> User {
    try await Task.sleep(nanoseconds: 1_000_000_000) // 1 秒
    
    guard id > 0 else {
        throw NetworkError.userNotFound
    }
    
    return User(id: id, name: "张三", email: "zhang@example.com")
}

func fetchUserPosts(userId: Int) async throws -> [Post] {
    try await Task.sleep(nanoseconds: 500_000_000) // 0.5 秒
    
    guard userId > 0 else {
        throw NetworkError.postsNotFound
    }
    
    return [Post(id: 1, title: "第一篇帖子")]
}

func loadUserProfile(userId: Int) async throws -> (user: User, posts: [Post]) {
    print("正在加载用户数据...")
    let user = try await fetchUserData(id: userId)
    print("正在加载用户帖子...")
    let posts = try await fetchUserPosts(userId: user.id)
    return (user, posts)
}

// 使用
Task {
    do {
        let profile = try await loadUserProfile(userId: 1)
        print("资料已加载:", profile)
    } catch {
        print("错误:", error)
    }
}
```
</UniversalEditor>

### 并发执行

<UniversalEditor title="并发执行" compare={true}>
```javascript !! js
// JavaScript：使用 Promise.all 并发执行
async function loadUserDataConcurrently(userId) {
    try {
        const [user, posts, settings] = await Promise.all([
            fetchUserData(userId),
            fetchUserPosts(userId),
            fetchUserSettings(userId)
        ]);
        
        return { user, posts, settings };
    } catch (error) {
        console.error("加载数据失败:", error.message);
        throw error;
    }
}

function fetchUserSettings(userId) {
    return new Promise((resolve, reject) => {
        setTimeout(() => {
            if (userId > 0) {
                resolve({ theme: "dark", notifications: true });
            } else {
                reject(new Error("设置未找到"));
            }
        }, 300);
    });
}

// 使用
loadUserDataConcurrently(1)
    .then(data => console.log("所有数据已加载:", data))
    .catch(error => console.error("错误:", error.message));
```

```swift !! swift
// Swift：使用 async let 并发执行
func loadUserDataConcurrently(userId: Int) async throws -> (user: User, posts: [Post], settings: UserSettings) {
    async let user = fetchUserData(id: userId)
    async let posts = fetchUserPosts(userId: userId)
    async let settings = fetchUserSettings(userId: userId)
    
    return try await (user, posts, settings)
}

func fetchUserSettings(userId: Int) async throws -> UserSettings {
    try await Task.sleep(nanoseconds: 300_000_000) // 0.3 秒
    
    guard userId > 0 else {
        throw NetworkError.settingsNotFound
    }
    
    return UserSettings(theme: "dark", notifications: true)
}

struct UserSettings {
    let theme: String
    let notifications: Bool
}

// 使用
Task {
    do {
        let data = try await loadUserDataConcurrently(userId: 1)
        print("所有数据已加载:", data)
    } catch {
        print("错误:", error)
    }
}
```
</UniversalEditor>

## 结构化并发

Swift 的结构化并发确保子任务在父任务被取消时得到适当管理和取消。

<UniversalEditor title="结构化并发" compare={true}>
```javascript !! js
// JavaScript：手动任务管理
class TaskManager {
    constructor() {
        this.tasks = new Set();
    }
    
    async runTask(taskFn) {
        const task = taskFn();
        this.tasks.add(task);
        
        try {
            const result = await task;
            this.tasks.delete(task);
            return result;
        } catch (error) {
            this.tasks.delete(task);
            throw error;
        }
    }
    
    async cancelAll() {
        const promises = Array.from(this.tasks).map(task => {
            // JavaScript 没有内置的任务取消
            // 这是一个简化的示例
            return Promise.resolve();
        });
        await Promise.all(promises);
        this.tasks.clear();
    }
}

const manager = new TaskManager();

async function processData() {
    const task1 = manager.runTask(async () => {
        await new Promise(resolve => setTimeout(resolve, 2000));
        return "任务 1 完成";
    });
    
    const task2 = manager.runTask(async () => {
        await new Promise(resolve => setTimeout(resolve, 1000));
        return "任务 2 完成";
    });
    
    try {
        const [result1, result2] = await Promise.all([task1, task2]);
        console.log(result1, result2);
    } catch (error) {
        console.error("错误:", error);
    }
}

processData();
```

```swift !! swift
// Swift：带自动取消的结构化并发
func processData() async {
    async let task1 = performTask(name: "任务 1", duration: 2.0)
    async let task2 = performTask(name: "任务 2", duration: 1.0)
    
    do {
        let (result1, result2) = try await (task1, task2)
        print(result1, result2)
    } catch {
        print("错误:", error)
    }
}

func performTask(name: String, duration: TimeInterval) async throws -> String {
    try await Task.sleep(nanoseconds: UInt64(duration * 1_000_000_000))
    
    // 检查取消
    try Task.checkCancellation()
    
    return "\(name) 完成"
}

// 带取消的使用
Task {
    let task = Task {
        await processData()
    }
    
    // 1.5 秒后取消
    try await Task.sleep(nanoseconds: 1_500_000_000)
    task.cancel()
}
```
</UniversalEditor>

## Actors

Swift actors 提供对可变状态的线程安全访问，防止数据竞争。

<UniversalEditor title="Actors 对比" compare={true}>
```javascript !! js
// JavaScript：手动同步和锁
class BankAccount {
    constructor(initialBalance) {
        this.balance = initialBalance;
        this.lock = false;
    }
    
    async withdraw(amount) {
        // 手动锁实现
        while (this.lock) {
            await new Promise(resolve => setTimeout(resolve, 10));
        }
        
        this.lock = true;
        try {
            if (this.balance >= amount) {
                this.balance -= amount;
                return { success: true, newBalance: this.balance };
            } else {
                return { success: false, error: "余额不足" };
            }
        } finally {
            this.lock = false;
        }
    }
    
    async deposit(amount) {
        while (this.lock) {
            await new Promise(resolve => setTimeout(resolve, 10));
        }
        
        this.lock = true;
        try {
            this.balance += amount;
            return { success: true, newBalance: this.balance };
        } finally {
            this.lock = false;
        }
    }
    
    async getBalance() {
        while (this.lock) {
            await new Promise(resolve => setTimeout(resolve, 10));
        }
        
        this.lock = true;
        try {
            return this.balance;
        } finally {
            this.lock = false;
        }
    }
}

const account = new BankAccount(1000);

async function performTransactions() {
    const [result1, result2] = await Promise.all([
        account.withdraw(500),
        account.deposit(200)
    ]);
    
    console.log("取款结果:", result1);
    console.log("存款结果:", result2);
    console.log("最终余额:", await account.getBalance());
}

performTransactions();
```

```swift !! swift
// Swift：带自动同步的 Actors
actor BankAccount {
    private var balance: Double
    
    init(initialBalance: Double) {
        self.balance = initialBalance
    }
    
    func withdraw(amount: Double) async -> (success: Bool, newBalance: Double, error: String?) {
        guard balance >= amount else {
            return (false, balance, "余额不足")
        }
        
        balance -= amount
        return (true, balance, nil)
    }
    
    func deposit(amount: Double) async -> (success: Bool, newBalance: Double) {
        balance += amount
        return (true, balance)
    }
    
    func getBalance() async -> Double {
        return balance
    }
}

let account = BankAccount(initialBalance: 1000)

func performTransactions() async {
    async let withdrawResult = account.withdraw(amount: 500)
    async let depositResult = account.deposit(amount: 200)
    
    let (withdraw, deposit) = await (withdrawResult, depositResult)
    
    print("取款结果:", withdraw)
    print("存款结果:", deposit)
    print("最终余额:", await account.getBalance())
}

Task {
    await performTransactions()
}
```
</UniversalEditor>

## 任务组

Swift 任务组允许您管理多个并发任务，共享取消和错误处理。

<UniversalEditor title="任务组" compare={true}>
```javascript !! js
// JavaScript：手动任务组管理
class TaskGroup {
    constructor() {
        this.tasks = [];
        this.results = [];
        this.errors = [];
    }
    
    async addTask(taskFn) {
        const task = taskFn().then(
            result => this.results.push(result),
            error => this.errors.push(error)
        );
        this.tasks.push(task);
    }
    
    async waitForAll() {
        await Promise.allSettled(this.tasks);
        return {
            results: this.results,
            errors: this.errors
        };
    }
    
    async cancelAll() {
        // JavaScript 没有内置的任务取消
        this.tasks = [];
        this.results = [];
        this.errors = [];
    }
}

async function processMultipleItems(items) {
    const group = new TaskGroup();
    
    for (const item of items) {
        group.addTask(async () => {
            await new Promise(resolve => setTimeout(resolve, 100));
            return `已处理: ${item}`;
        });
    }
    
    const { results, errors } = await group.waitForAll();
    console.log("结果:", results);
    console.log("错误:", errors);
}

processMultipleItems(["A", "B", "C", "D"]);
```

```swift !! swift
// Swift：带自动管理的任务组
func processMultipleItems(_ items: [String]) async {
    await withTaskGroup(of: String.self) { group in
        for item in items {
            group.addTask {
                try await Task.sleep(nanoseconds: 100_000_000)
                return "已处理: \(item)"
            }
        }
        
        var results: [String] = []
        for await result in group {
            results.append(result)
        }
        
        print("结果:", results)
    }
}

// 带错误处理的任务组
func processWithErrorHandling(_ items: [String]) async {
    await withThrowingTaskGroup(of: String.self) { group in
        for item in items {
            group.addTask {
                try await Task.sleep(nanoseconds: 100_000_000)
                
                if item == "B" {
                    throw ProcessingError.invalidItem(item)
                }
                
                return "已处理: \(item)"
            }
        }
        
        do {
            var results: [String] = []
            for try await result in group {
                results.append(result)
            }
            print("结果:", results)
        } catch {
            print("错误:", error)
        }
    }
}

enum ProcessingError: Error {
    case invalidItem(String)
}

// 使用
Task {
    await processMultipleItems(["A", "B", "C", "D"])
    await processWithErrorHandling(["A", "B", "C", "D"])
}
```
</UniversalEditor>

## 异步序列

Swift 异步序列提供了一种遍历异步数据流的方法。

<UniversalEditor title="异步序列" compare={true}>
```javascript !! js
// JavaScript：异步迭代器和生成器
async function* generateNumbers(start, end) {
    for (let i = start; i <= end; i++) {
        await new Promise(resolve => setTimeout(resolve, 100));
        yield i;
    }
}

async function* generateUserData(userIds) {
    for (const id of userIds) {
        try {
            const user = await fetchUserData(id);
            yield user;
        } catch (error) {
            console.error(`获取用户 ${id} 失败:`, error.message);
        }
    }
}

async function processAsyncSequence() {
    // 处理数字
    for await (const num of generateNumbers(1, 5)) {
        console.log("数字:", num);
    }
    
    // 处理用户数据
    const userIds = [1, 2, 3, 4, 5];
    for await (const user of generateUserData(userIds)) {
        console.log("用户:", user.name);
    }
}

processAsyncSequence();
```

```swift !! swift
// Swift：带内置支持的异步序列
struct NumberGenerator: AsyncSequence {
    typealias Element = Int
    
    let start: Int
    let end: Int
    
    init(start: Int, end: Int) {
        self.start = start
        self.end = end
    }
    
    func makeAsyncIterator() -> AsyncIterator {
        return AsyncIterator(start: start, end: end)
    }
    
    struct AsyncIterator: AsyncIteratorProtocol {
        var current: Int
        let end: Int
        
        init(start: Int, end: Int) {
            self.current = start
            self.end = end
        }
        
        mutating func next() async -> Int? {
            guard current <= end else { return nil }
            
            try? await Task.sleep(nanoseconds: 100_000_000)
            let value = current
            current += 1
            return value
        }
    }
}

struct UserDataGenerator: AsyncSequence {
    typealias Element = User
    
    let userIds: [Int]
    
    func makeAsyncIterator() -> AsyncIterator {
        return AsyncIterator(userIds: userIds)
    }
    
    struct AsyncIterator: AsyncIteratorProtocol {
        let userIds: [Int]
        var index = 0
        
        mutating func next() async -> User? {
            guard index < userIds.count else { return nil }
            
            let id = userIds[index]
            index += 1
            
            do {
                return try await fetchUserData(id: id)
            } catch {
                print("获取用户 \(id) 失败:", error)
                return nil
            }
        }
    }
}

func processAsyncSequence() async {
    // 处理数字
    for await num in NumberGenerator(start: 1, end: 5) {
        print("数字:", num)
    }
    
    // 处理用户数据
    let userIds = [1, 2, 3, 4, 5]
    for await user in UserDataGenerator(userIds: userIds) {
        print("用户:", user.name)
    }
}

Task {
    await processAsyncSequence()
}
```
</UniversalEditor>

## 高级并发

### Continuations 和回调集成

<UniversalEditor title="Continuations" compare={true}>
```javascript !! js
// JavaScript：基于 Promise 的回调集成
function legacyCallbackFunction(callback) {
    setTimeout(() => {
        callback(null, "来自回调的数据");
    }, 1000);
}

function promisifyCallback() {
    return new Promise((resolve, reject) => {
        legacyCallbackFunction((error, result) => {
            if (error) {
                reject(error);
            } else {
                resolve(result);
            }
        });
    });
}

async function useLegacyFunction() {
    try {
        const result = await promisifyCallback();
        console.log("结果:", result);
    } catch (error) {
        console.error("错误:", error);
    }
}

useLegacyFunction();
```

```swift !! swift
// Swift：用于回调集成的 Continuations
func legacyCallbackFunction(completion: @escaping (Result<String, Error>) -> Void) {
    DispatchQueue.global().asyncAfter(deadline: .now() + 1.0) {
        completion(.success("来自回调的数据"))
    }
}

func useLegacyFunction() async throws -> String {
    return try await withCheckedThrowingContinuation { continuation in
        legacyCallbackFunction { result in
            continuation.resume(with: result)
        }
    }
}

// 使用
Task {
    do {
        let result = try await useLegacyFunction()
        print("结果:", result)
    } catch {
        print("错误:", error)
    }
}
```
</UniversalEditor>

### 自定义执行器

<UniversalEditor title="自定义执行器" compare={true}>
```javascript !! js
// JavaScript：自定义执行上下文
class CustomExecutor {
    constructor(concurrency = 1) {
        this.queue = [];
        this.running = 0;
        this.concurrency = concurrency;
    }
    
    async execute(task) {
        return new Promise((resolve, reject) => {
            this.queue.push({ task, resolve, reject });
            this.processQueue();
        });
    }
    
    async processQueue() {
        if (this.running >= this.concurrency || this.queue.length === 0) {
            return;
        }
        
        this.running++;
        const { task, resolve, reject } = this.queue.shift();
        
        try {
            const result = await task();
            resolve(result);
        } catch (error) {
            reject(error);
        } finally {
            this.running--;
            this.processQueue();
        }
    }
}

const executor = new CustomExecutor(2);

async function runTasks() {
    const tasks = [
        () => new Promise(resolve => setTimeout(() => resolve("任务 1"), 1000)),
        () => new Promise(resolve => setTimeout(() => resolve("任务 2"), 500)),
        () => new Promise(resolve => setTimeout(() => resolve("任务 3"), 800))
    ];
    
    const results = await Promise.all(tasks.map(task => executor.execute(task)));
    console.log("结果:", results);
}

runTasks();
```

```swift !! swift
// Swift：带结构化并发的自定义执行器
actor CustomExecutor {
    private var queue: [(task: () async throws -> String, continuation: CheckedContinuation<String, Error>)] = []
    private var running = 0
    private let concurrency: Int
    
    init(concurrency: Int = 1) {
        self.concurrency = concurrency
    }
    
    func execute(_ task: @escaping () async throws -> String) async throws -> String {
        return try await withCheckedThrowingContinuation { continuation in
            queue.append((task, continuation))
            Task {
                await processQueue()
            }
        }
    }
    
    private func processQueue() async {
        guard running < concurrency, !queue.isEmpty else { return }
        
        running += 1
        let (task, continuation) = queue.removeFirst()
        
        do {
            let result = try await task()
            continuation.resume(returning: result)
        } catch {
            continuation.resume(throwing: error)
        }
        
        running -= 1
        await processQueue()
    }
}

let executor = CustomExecutor(concurrency: 2)

func runTasks() async {
    let tasks = [
        { try await Task.sleep(nanoseconds: 1_000_000_000); return "任务 1" },
        { try await Task.sleep(nanoseconds: 500_000_000); return "任务 2" },
        { try await Task.sleep(nanoseconds: 800_000_000); return "任务 3" }
    ]
    
    async let results = withTaskGroup(of: String.self) { group in
        for task in tasks {
            group.addTask {
                try await executor.execute(task)
            }
        }
        
        var results: [String] = []
        for await result in group {
            results.append(result)
        }
        return results
    }
    
    print("结果:", await results)
}

Task {
    await runTasks()
}
```
</UniversalEditor>

## 练习题

### 练习 1：并发数据处理

<UniversalEditor title="练习 1：并发数据处理" compare={true}>
```javascript !! js
// JavaScript 解答
class DataProcessor {
    constructor(concurrency = 3) {
        this.concurrency = concurrency;
        this.semaphore = concurrency;
    }
    
    async processItem(item) {
        await this.acquireSemaphore();
        try {
            // 模拟处理
            await new Promise(resolve => setTimeout(resolve, 100));
            return `已处理: ${item}`;
        } finally {
            this.releaseSemaphore();
        }
    }
    
    async acquireSemaphore() {
        while (this.semaphore <= 0) {
            await new Promise(resolve => setTimeout(resolve, 10));
        }
        this.semaphore--;
    }
    
    releaseSemaphore() {
        this.semaphore++;
    }
    
    async processBatch(items) {
        const tasks = items.map(item => this.processItem(item));
        return Promise.all(tasks);
    }
}

async function processDataConcurrently() {
    const processor = new DataProcessor(3);
    const items = Array.from({ length: 10 }, (_, i) => `项目 ${i + 1}`);
    
    console.log("开始并发处理...");
    const start = Date.now();
    
    const results = await processor.processBatch(items);
    
    const duration = Date.now() - start;
    console.log(`处理完成，耗时 ${duration}ms`);
    console.log("结果:", results);
}

processDataConcurrently();
```

```swift !! swift
// Swift 解答
actor DataProcessor {
    private var semaphore: Int
    private let maxConcurrency: Int
    
    init(concurrency: Int = 3) {
        self.maxConcurrency = concurrency
        self.semaphore = concurrency
    }
    
    func processItem(_ item: String) async -> String {
        await acquireSemaphore()
        defer { Task { await releaseSemaphore() } }
        
        // 模拟处理
        try? await Task.sleep(nanoseconds: 100_000_000)
        return "已处理: \(item)"
    }
    
    private func acquireSemaphore() async {
        while semaphore <= 0 {
            try? await Task.sleep(nanoseconds: 10_000_000)
        }
        semaphore -= 1
    }
    
    private func releaseSemaphore() {
        semaphore += 1
    }
    
    func processBatch(_ items: [String]) async -> [String] {
        await withTaskGroup(of: String.self) { group in
            for item in items {
                group.addTask {
                    await self.processItem(item)
                }
            }
            
            var results: [String] = []
            for await result in group {
                results.append(result)
            }
            return results
        }
    }
}

func processDataConcurrently() async {
    let processor = DataProcessor(concurrency: 3)
    let items = (1...10).map { "项目 \($0)" }
    
    print("开始并发处理...")
    let start = Date()
    
    let results = await processor.processBatch(items)
    
    let duration = Date().timeIntervalSince(start) * 1000
    print("处理完成，耗时 \(Int(duration))ms")
    print("结果:", results)
}

Task {
    await processDataConcurrently()
}
```
</UniversalEditor>

### 练习 2：实时数据流处理

<UniversalEditor title="练习 2：实时数据流" compare={true}>
```javascript !! js
// JavaScript 解答
class DataStream {
    constructor() {
        this.subscribers = new Set();
        this.isRunning = false;
    }
    
    subscribe(callback) {
        this.subscribers.add(callback);
        return () => this.subscribers.delete(callback);
    }
    
    async start() {
        this.isRunning = true;
        let counter = 0;
        
        while (this.isRunning) {
            const data = { id: counter++, timestamp: Date.now(), value: Math.random() };
            
            // 通知所有订阅者
            const promises = Array.from(this.subscribers).map(callback => 
                callback(data).catch(error => console.error("订阅者错误:", error))
            );
            
            await Promise.all(promises);
            await new Promise(resolve => setTimeout(resolve, 100));
        }
    }
    
    stop() {
        this.isRunning = false;
    }
}

class DataProcessor {
    constructor() {
        this.processedCount = 0;
    }
    
    async process(data) {
        await new Promise(resolve => setTimeout(resolve, 50));
        this.processedCount++;
        
        if (data.value > 0.8) {
            console.log(`检测到高值: ${data.value}`);
        }
        
        return `已处理 ${this.processedCount}: ${data.value}`;
    }
    
    getStats() {
        return { processedCount: this.processedCount };
    }
}

async function runDataStream() {
    const stream = new DataStream();
    const processor = new DataProcessor();
    
    // 订阅流
    const unsubscribe = stream.subscribe(async (data) => {
        const result = await processor.process(data);
        console.log(result);
    });
    
    // 启动流
    const streamTask = stream.start();
    
    // 2 秒后停止
    setTimeout(() => {
        stream.stop();
        console.log("最终统计:", processor.getStats());
    }, 2000);
    
    await streamTask;
}

runDataStream();
```

```swift !! swift
// Swift 解答
actor DataStream {
    private var subscribers: [(Data) async -> Void] = []
    private var isRunning = false
    
    func subscribe(_ callback: @escaping (Data) async -> Void) {
        subscribers.append(callback)
    }
    
    func start() async {
        isRunning = true
        var counter = 0
        
        while isRunning {
            let data = Data(id: counter, timestamp: Date(), value: Double.random())
            counter += 1
            
            // 通知所有订阅者
            await withTaskGroup(of: Void.self) { group in
                for subscriber in subscribers {
                    group.addTask {
                        await subscriber(data)
                    }
                }
            }
            
            try? await Task.sleep(nanoseconds: 100_000_000)
        }
    }
    
    func stop() {
        isRunning = false
    }
}

struct Data {
    let id: Int
    let timestamp: Date
    let value: Double
}

struct Data {
    let id: Int
    let timestamp: Date
    let value: Double
}

actor DataProcessor {
    private var processedCount = 0
    
    func process(_ data: Data) async -> String {
        try? await Task.sleep(nanoseconds: 50_000_000)
        processedCount += 1
        
        if data.value > 0.8 {
            print("检测到高值: \(data.value)")
        }
        
        return "已处理 \(processedCount): \(data.value)"
    }
    
    func getStats() -> (processedCount: Int) {
        return (processedCount: processedCount)
    }
}

func runDataStream() async {
    let stream = DataStream()
    let processor = DataProcessor()
    
    // 订阅流
    await stream.subscribe { data in
        let result = await processor.process(data)
        print(result)
    }
    
    // 启动流
    async let streamTask = stream.start()
    
    // 2 秒后停止
    try? await Task.sleep(nanoseconds: 2_000_000_000)
    await stream.stop()
    
    let stats = await processor.getStats()
    print("最终统计:", stats)
    
    await streamTask
}

Task {
    await runDataStream()
}
```
</UniversalEditor>

## 关键要点

### Swift 并发优势
1. **结构化并发**：自动任务生命周期管理
2. **数据竞争预防**：Actors 提供编译时安全
3. **取消**：内置取消支持
4. **性能**：高效的任务调度和执行
5. **内存安全**：并发代码的自动内存管理
6. **集成**：与现有代码的无缝集成

### 与 JavaScript 的关键差异
1. **任务管理**：结构化 vs 手动任务管理
2. **数据安全**：Actors vs 手动同步
3. **取消**：自动 vs 手动取消
4. **性能**：原生 vs 基于事件循环的执行
5. **内存安全**：编译时 vs 运行时安全
6. **集成**：原生 vs 基于回调的集成

### 最佳实践
1. **使用结构化并发**进行任务管理
2. **利用 actors**处理共享可变状态
3. **在长时间运行的任务中实现适当的取消**
4. **使用任务组**进行并发操作
5. **优先使用异步序列**处理数据流
6. **在并发代码中适当处理错误**

### 下一步
在下一个模块中，我们将探索 Swift 的内存管理和性能优化技术，包括 ARC、值类型和性能分析，将它们与 JavaScript 的垃圾回收和优化策略进行对比。 